Sistemas de Mensageria
Mensageria ou messaging é uma forma de comunicação presente em sistemas distribuídos realizada através da troca de mensagens (eventos), sendo essas mensagens gerenciadas por um message broker.
Mas... o que é um message broker?
De forma resumida, message broker é um intermediador: um componente que responsável por intermediar a troca de mensagens entre produtores (producers) e consumidores (consumers).
Producers e consumers, resumidamente, são sistemas de software que, respectivamente, enviam e recebem mensagens para/do message broker
Fazendo uma analogia, um carteiro (message broker) é responsável por garantir que uma carta saia do remetente (producer) e chegue até o destinatário (consumer).
Componentes básicos do message broker
Existem diversos message brokers disponíveis no mercado, dentre eles, podemos citar como mais populares: RabbitMQ, Apache Kafka, Amazon SQS, Google Cloud Pub/Sub, dentre outros. No entanto, independente do fornecedor, normalmente encontraremos os seguintes elementos:
- Producer - sistema que irá enviar (produzir) uma mensagemm
- Consumer - sistema que irá consumir mensagens de uma fila
- Queue ou topic - fila (ou tópico) para a qual a mensagem será enviada e armazenada
- Exchange - Componente responsável por rotear as mensagens para suas respectivas filas e garantir uma distribuição uniforme entre os consumidores
- Message ou event - A mensagem que está sendo transmitida entre produtor e consumidor.
Vantagens e Desvantagens em utilizar mensageria
Vantagens
- A troca de mensagens entre produtor e consumidor pode ocorrer independente se o um ou outro estiver online, o message broker se encarregará de entregar a mensagem assim que o consumidor ficar ativo novamente.
- Message brokers são capazes de garantir a entrega da mensagem, tornando o sistema mais confiável.
- O processamento assíncrono ajuda a melhorar a performance da aplicação e experiência do usuário.
- Message brokers são capazes de reenviar mensagens não entregues além de manter um registro das falhas para análise futura.
Desvantagens
- A consistência entre os sistemas (produtor e consumidor) é eventual, ou seja, em determinado momento poderá haver inconsistência nos dados entre os sistemas.
Tá na hora do Código
Nota: Todos os exemplos a seguir utilizarão Rabbit MQ como message broker e typescript como linguagem de programação.
Nota2: Focaremos na implementação dos producers e consumers, portanto não entraremos em detalhes sobre a implementação das demais dependências nem a organização do código.
O que iremos fazer?
Para nosso exemplo iremos construir uma API que atuará como producer e consumer de mensagens. Iremos explorar todos os tipos de exchange disponíveis no RabbitMQ, sendo elas:
- direct: a mensagem é enviada diretamente para a fila conectada à exchange, cuja chave é exatamente igual à informada;
- fanout: a mensagem é enviada para todas as filas conectadas à exchange;
- topic: a mensagem é enviada para todas as filas conectadas à exchange, cuja chave respeita o padrão configurado.
Producer
Para produzir a mensagem iremos expor 3 endpoints para que possamos criar nossas mensagens, sendo eles:
POST /messages/direct?name=nome_da_fila
POST /messages/topic?name=pattern_da_fila
POST /messages/fanout
De modo a conseguirmos personalizar a fila para qual vamos enviar as mensagens no caso das 2 primeiras, receberemos o nome via query param.
O conteúdo da mensagem deverá ser enviado no corpo da requisição respeitando o seguinte contrato:
{
"content": "conteúdo da mensagem",
"date": "2023-01-01T10:00:00"
}
Consumer
Teremos 2 consumidores que irão receber a mensagem e salvar num arquivo texto, cada consumidor irá mostrar um log com a seguinte estrutura: consumer-1|data_envio|data_erecebimento|tipo|conteudo
Setup
Para uma melhor organização foi criada um adaptador para a biblioteca amqplib, essa classe tem como objetivo adapter a interface da amqplib para nossa interface Queue, que contém apenas os métodos necessários para enviar e consumir mensagens.
import { Queue } from "./Queue";
import amqp from "amqplib";
export class RabbitMQQueueAdapter implements Queue {
connection: amqp.Connection | undefined;
constructor(private exchange: string) {
}
async connect(): Promise<void> {
this.connection = await amqp.connect("amqp://rabbitmq:rabbitmq@rabbit");
}
async close(): Promise<void> {
if (!this.connection) {
throw new Error("connection closed");
}
this.connection.close();
}
async publish(exchangeName: string, queue: string, payload: any): Promise<void> {
if (!this.connection) throw new Error("connection closed");
const channel = await this.connection.createChannel();
await channel.checkExchange(this.exchange);
channel.publish(exchangeName, queue, Buffer.from(JSON.stringify(payload)));
}
async sendToQueue(queue: string, payload: any): Promise<void> {
if (!this.connection) throw new Error("connection closed");
const channel = await this.connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)));
}
async consume(queueName: string, callback: any): Promise<void> {
if (!this.connection) throw new Error("connection closed");
const channel = await this.connection.createChannel();
await channel.assertQueue(queueName, { durable: true });
await channel.consume(queueName, async function (msg: any) {
await callback(JSON.parse(msg.content.toString()));
}, {
noAck: true
});
}
}
Destaco aqui os métodos publish
e consume
, iremos utilizar o primeiro para enviar mensagens à nossa exchange e o segundo para consumir mensagens existentes na fila selecionada.
O método sendToQueue
é utilizado para ignorar o roteamento da exchange e enviar uma mensagem diretamente à fila desejada. Não utilizaremos ele nesse exemplo.
Producer
Em nosso exemplo, a classe responsável por montar a mensagem e chamar o método publish é a ProduceMessageUseCase
:
import { Message } from "../domain/entity/Message";
import { Queue } from "../infra/queue/Queue";
export class ProduceMessageUseCase {
types = {
direct: "directExchange",
topic: "topicExchange",
fanout: "fanoutExchange"
}
constructor(private queue: Queue) {
}
async execute(input: ProduceMessageInputDto): Promise<void> {
const message = new Message(input.origin, input.content, input.date);
const exchangeName = this.types[input.origin] ?? this.types.fanout;
await this.queue.publish(exchangeName, input.name, message.getContent());
}
}
export type ProduceMessageInputDto = {
origin: "topic" | "direct" | "fanout";
name: string;
content: string;
date: Date;
}
A mágica acontece nesse trecho await this.queue.publish(exchangeName, input.name, message.getContent());
onde estamos efetivamente enviando a mensagem para nosso message broker especificando o exchange desejado.
Consumer
Por fim os consumidores das mensagens que separamos em 2 arquivos simples: controller - que contém os comandos para ler da fila - e handler - que contém efetivamente a lógica do que faremos com a mensagem:
// Consumer1Controller
import { Consumer1Handler } from "../handler/Consumer1Handler";
import { Queue } from "../queue/Queue";
export class Consumer1Controller {
constructor(queue: Queue) {
queue.consume("fila1", async function (msg: any) {
const handler = new Consumer1Handler();
await handler.handle(msg);
})
}
}
// Consumer1Handler
export class Consumer1Handler {
async handle(message: any) {
console.log(`consumer-1|${message.sentAt}|${(new Date()).toISOString()}|${message.origin}|${message.content}`);
}
}
O método consume
será chamado cada vez que uma nova mensagem chegar à fila1
e chamará o método handle
que poderá fazer o que for necessário com ela, nesse caso, ele apenas organiza e mostra as informações em tela com o prefixo consumer-1
Resultados
Após tudo configurado e, com a aplicação rodando fizemos os seguintes experimentos:
- Requisição ao endpoint
http://localhost:3001/messages/direct?name=fila1
2. Requisição ao endpoint http://localhost:3001/messages/topic?name=fila.logs
3. Requisição ao endpoint http://localhost:3001/messages/fanout
Conclusão
Message brokers, e sistemas de mensageria em geral, possuem uma estrutura robusta e são ferramentas excelentes para comunicação em sistemas distribuídos, principalmente onde resiliência e garantia de entrega são características importantes.
No entanto vale ressaltar que, apesar da implementação parecer simples, é importante lembrar que não existe bala de prata e sempre considerar os trade-offs antes de implementar esse tipo de solução, uma vez que acrescenta uma complexidade que, muitas vezes é desnecessária.
Confira o projeto na íntegra em https://github.com/virb30/rabbitmq-ts