Sistemas de Mensageria

Mensageria ou messaging é uma forma de comunicação presente em sistemas distribuídos realizada através da troca de mensagens (eventos), sendo essas mensagens gerenciadas por um message broker.

Mas... o que é um message broker?

De forma resumida, message broker é um intermediador: um componente que responsável por intermediar a troca de mensagens entre produtores (producers) e consumidores (consumers).

Producers e consumers, resumidamente, são sistemas de software que, respectivamente, enviam e recebem mensagens para/do message broker

Comunicação producer - broker - consumer

Fazendo uma analogia, um carteiro (message broker) é responsável por garantir que uma carta saia do remetente (producer) e chegue até o destinatário (consumer).

Componentes básicos do message broker

Existem diversos message brokers disponíveis no mercado, dentre eles, podemos citar como mais populares: RabbitMQ, Apache Kafka, Amazon SQS, Google Cloud Pub/Sub, dentre outros. No entanto, independente do fornecedor, normalmente encontraremos os seguintes elementos:

  • Producer - sistema que irá enviar (produzir) uma mensagemm
  • Consumer - sistema que irá consumir mensagens de uma fila
  • Queue ou topic - fila (ou tópico) para a qual a mensagem será enviada e armazenada
  • Exchange - Componente responsável por rotear as mensagens para suas respectivas filas e garantir uma distribuição uniforme entre os consumidores
  • Message ou event - A mensagem que está sendo transmitida entre produtor e consumidor.

Vantagens e Desvantagens em utilizar mensageria

Vantagens

  • A troca de mensagens entre produtor e consumidor pode ocorrer independente se o um ou outro estiver online, o message broker se encarregará de entregar a mensagem assim que o consumidor ficar ativo novamente.
  • Message brokers são capazes de garantir a entrega da mensagem, tornando o sistema mais confiável.
  • O processamento assíncrono ajuda a melhorar a performance da aplicação e experiência do usuário.
  • Message brokers são capazes de reenviar mensagens não entregues além de manter um registro das falhas para análise futura.

Desvantagens

  • A consistência entre os sistemas (produtor e consumidor) é eventual, ou seja, em determinado momento poderá haver inconsistência nos dados entre os sistemas.

Tá na hora do Código

Nota: Todos os exemplos a seguir utilizarão Rabbit MQ como message broker e typescript como linguagem de programação.

Nota2: Focaremos na implementação dos producers e consumers, portanto não entraremos em detalhes sobre a implementação das demais dependências nem a organização do código.

O que iremos fazer?

Para nosso exemplo iremos construir uma API que atuará como producer e consumer de mensagens. Iremos explorar todos os tipos de exchange disponíveis no RabbitMQ, sendo elas:

  • direct: a mensagem é enviada diretamente para a fila conectada à exchange, cuja chave é exatamente igual à informada;
  • fanout: a mensagem é enviada para todas as filas conectadas à exchange;
  • topic: a mensagem é enviada para todas as filas conectadas à exchange, cuja chave respeita o padrão configurado.

Producer

Para produzir a mensagem iremos expor 3 endpoints para que possamos criar nossas mensagens, sendo eles:

  1. POST /messages/direct?name=nome_da_fila
  2. POST /messages/topic?name=pattern_da_fila
  3. POST /messages/fanout

De modo a conseguirmos personalizar a fila para qual vamos enviar as mensagens no caso das 2 primeiras, receberemos o nome via query param.

O conteúdo da mensagem deverá ser enviado no corpo da requisição respeitando o seguinte contrato:

{
	"content": "conteúdo da mensagem",
    "date": "2023-01-01T10:00:00"
}

Consumer

Teremos 2 consumidores que irão receber a mensagem e salvar num arquivo texto, cada consumidor irá mostrar um log com a seguinte estrutura: consumer-1|data_envio|data_erecebimento|tipo|conteudo

Setup

Para uma melhor organização foi criada um adaptador para a biblioteca amqplib, essa classe tem como objetivo adapter a interface da amqplib para nossa interface Queue, que contém apenas os métodos necessários para enviar e consumir mensagens.

import { Queue } from "./Queue";
import amqp from "amqplib";

export class RabbitMQQueueAdapter implements Queue {

    connection: amqp.Connection | undefined;

    constructor(private exchange: string) {

    }

    async connect(): Promise<void> {
        this.connection = await amqp.connect("amqp://rabbitmq:rabbitmq@rabbit");
    }


    async close(): Promise<void> {
        if (!this.connection) {
            throw new Error("connection closed");
        }
        this.connection.close();
    }

    async publish(exchangeName: string, queue: string, payload: any): Promise<void> {
        if (!this.connection) throw new Error("connection closed");
        const channel = await this.connection.createChannel();
        await channel.checkExchange(this.exchange);
        channel.publish(exchangeName, queue, Buffer.from(JSON.stringify(payload)));
    }

    async sendToQueue(queue: string, payload: any): Promise<void> {
        if (!this.connection) throw new Error("connection closed");
        const channel = await this.connection.createChannel();
        await channel.assertQueue(queue, { durable: true });
        channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)));
    }

    async consume(queueName: string, callback: any): Promise<void> {
        if (!this.connection) throw new Error("connection closed");
        const channel = await this.connection.createChannel();
        await channel.assertQueue(queueName, { durable: true });
        await channel.consume(queueName, async function (msg: any) {
            await callback(JSON.parse(msg.content.toString()));
        }, {
            noAck: true
        });
    }
}

Destaco aqui os métodos publish e consume, iremos utilizar o primeiro para enviar mensagens à nossa exchange e o segundo para consumir mensagens existentes na fila selecionada.

O método sendToQueue é utilizado para ignorar o roteamento da exchange e enviar uma mensagem diretamente à fila desejada. Não utilizaremos ele nesse exemplo.

Producer

Em nosso exemplo, a classe responsável por montar a mensagem e chamar o método publish é a ProduceMessageUseCase:

import { Message } from "../domain/entity/Message";
import { Queue } from "../infra/queue/Queue";

export class ProduceMessageUseCase {

    types = {
        direct: "directExchange",
        topic: "topicExchange",
        fanout: "fanoutExchange"
    }

    constructor(private queue: Queue) {

    }

    async execute(input: ProduceMessageInputDto): Promise<void> {
        const message = new Message(input.origin, input.content, input.date);
        const exchangeName = this.types[input.origin] ?? this.types.fanout;
        await this.queue.publish(exchangeName, input.name, message.getContent());
    }
}

export type ProduceMessageInputDto = {
    origin: "topic" | "direct" | "fanout";
    name: string;
    content: string;
    date: Date;
}

A mágica acontece nesse trecho await this.queue.publish(exchangeName, input.name, message.getContent()); onde estamos efetivamente enviando a mensagem para nosso message broker especificando o exchange desejado.

Consumer

Por fim os consumidores das mensagens que separamos em 2  arquivos simples: controller - que contém os comandos para ler da fila - e handler - que contém efetivamente a lógica do que faremos com a mensagem:

// Consumer1Controller
import { Consumer1Handler } from "../handler/Consumer1Handler";
import { Queue } from "../queue/Queue";

export class Consumer1Controller {
    constructor(queue: Queue) {
        queue.consume("fila1", async function (msg: any) {
            const handler = new Consumer1Handler();
            await handler.handle(msg);
        })
    }
}


// Consumer1Handler
export class Consumer1Handler {
    async handle(message: any) {
        console.log(`consumer-1|${message.sentAt}|${(new Date()).toISOString()}|${message.origin}|${message.content}`);
    }
}

O método consume será chamado cada vez que uma nova mensagem chegar à fila1 e chamará o método handle que poderá fazer o que for necessário com ela, nesse caso, ele apenas organiza e mostra as informações em tela com o prefixo consumer-1

Resultados

Após tudo configurado e, com a aplicação rodando fizemos os seguintes experimentos:

  1. Requisição ao endpoint http://localhost:3001/messages/direct?name=fila1
Resultado consumo exchange direct

2. Requisição ao endpoint http://localhost:3001/messages/topic?name=fila.logs

Resultado consumo exchange topic

3. Requisição ao endpoint http://localhost:3001/messages/fanout

Resultado consumo exchange fanout

Conclusão

Message brokers, e sistemas de mensageria em geral, possuem uma estrutura robusta e são ferramentas excelentes para comunicação em sistemas distribuídos, principalmente onde resiliência e garantia de entrega são características importantes.

No entanto vale ressaltar que, apesar da implementação parecer simples, é importante lembrar que não existe bala de prata e sempre considerar os trade-offs antes de implementar esse tipo de solução, uma vez que acrescenta uma complexidade que, muitas vezes é desnecessária.

Confira o projeto na íntegra em https://github.com/virb30/rabbitmq-ts