[C#] Consumir fila RabbitMQ com assincronia e paralelismo
Assincronicidade e paralelismo são ótimas ferramentas em situações em que um processo gasta mais tempo esperando retorno de I/O do que fazendo processando. Usando async e paralelismo, o processo pode começar a processar novas requisições assim que a chamada de I/O é iniciada, diminuindo o tempo que ele fica ocioso e aumentando a quantidade de requisições processadas por segundo. Se o processo for um serviço, é possível processar a mesma quantidade de mensagem com uma quantidade menor de instâncias desse serviço.
RabbitMQ - Síncrono e paralelo
RabbitMQ permite que um consumer possa receber várias mensagems para ser processados em paralelo, usando a propriedade ConsumerDispatchConcurrency. Como todo processamento paralelo, é importante assegurar que o processamento de cada mensagem seja thread-safe.
O código abaixo configura a conexão para que ela possa enviar 5 mensagens ao mesmo tempo.
var factory = new ConnectionFactory()
{
HostName = "localhost",
ConsumerDispatchConcurrency = 5
};
this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();
var consumer = new EventingBasicConsumer(RabbitChannel);
consumer.Received += (model, ea) =>
{
// Process message with thread-safety
};
RabbitChannel.BasicConsume(queue: "queueName", autoAck: false, consumer: consumer);
Quando o sistema usa BasicQos, é recomendado que a quantidade de mensagens seja no mínimo a quantidade configurada em ConsumerDispatchConcurrency. Caso contrário, a library criará threads que nunca serão utilizadas, pois nunca haverá mensagens o suficiente para ativar elas.
int concurrencyLevel = 5;
var factory = new ConnectionFactory()
{
HostName = "localhost",
ConsumerDispatchConcurrency = concurrencyLevel
};
this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();
this.RabbitChannel.BasicQos(0, concurrencyLevel, false);
RabbitMQ - Assíncrono e paralelo
Chamadas assíncronas funcionam da mesma forma, a diferença é que a ConnectionFactory deve mudar a propriedade DispatchConsumersAsync, e o consumo das mensagens deve ser feita usando uma das classes fornecidas pela biblioteca que implementa IAsyncBasicConsumer.
var factory = new ConnectionFactory()
{
HostName = "localhost",
DispatchConsumersAsync = true,
ConsumerDispatchConcurrency = 5
};
this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();
var consumer = new AsyncEventingBasicConsumer(RabbitChannel);
consumer.Received += async (model, ea) =>
{
// Process message with thread-safety
};
RabbitChannel.BasicConsume(queue: "queueName", autoAck: false, consumer: consumer);
Exemplo de BackgroundService usando async e paralelismo
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
public class ExampleWorker : BackgroundService
{
private IConnection? RabbitConnection;
private IModel? RabbitChannel;
public override Task StartAsync(CancellationToken cancellationToken)
{
int concurrencyLevel = 5;
var factory = new ConnectionFactory()
{
HostName = "localhost",
DispatchConsumersAsync = true,
ConsumerDispatchConcurrency = concurrencyLevel
};
this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();
this.RabbitChannel.BasicQos(0, concurrencyLevel, false);
var consumer = new AsyncEventingBasicConsumer(RabbitChannel);
consumer.Received += async (model, ea) =>
{
// Process message with thread-safety
};
RabbitChannel.BasicConsume(queue: "queueName", autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken) => Task.CompletedTask;
public override Task StopAsync(CancellationToken cancellationToken)
{
RabbitChannel?.Dispose();
RabbitConnection?.Dispose();
return Task.CompletedTask;
}
}