Vinicius Quinafelex Alves

🌐English version

[C#] Consumir fila RabbitMQ com assincronia e paralelismo

Assincronicidade e paralelismo são ótimas ferramentas em situações em que um processo gasta mais tempo esperando retorno de I/O do que fazendo processando. Usando async e paralelismo, o processo pode começar a processar novas requisições assim que a chamada de I/O é iniciada, diminuindo o tempo que ele fica ocioso e aumentando a quantidade de requisições processadas por segundo. Se o processo for um serviço, é possível processar a mesma quantidade de mensagem com uma quantidade menor de instâncias desse serviço.

RabbitMQ - Síncrono e paralelo

RabbitMQ permite que um consumer possa receber várias mensagems para ser processados em paralelo, usando a propriedade ConsumerDispatchConcurrency. Como todo processamento paralelo, é importante assegurar que o processamento de cada mensagem seja thread-safe.

O código abaixo configura a conexão para que ela possa enviar 5 mensagens ao mesmo tempo.

var factory = new ConnectionFactory() 
{ 
    HostName = "localhost", 
    ConsumerDispatchConcurrency = 5
};

this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();

var consumer = new EventingBasicConsumer(RabbitChannel);
consumer.Received += (model, ea) =>
{
    // Process message with thread-safety
};

RabbitChannel.BasicConsume(queue: "queueName", autoAck: false, consumer: consumer);

Quando o sistema usa BasicQos, é recomendado que a quantidade de mensagens seja no mínimo a quantidade configurada em ConsumerDispatchConcurrency. Caso contrário, a library criará threads que nunca serão utilizadas, pois nunca haverá mensagens o suficiente para ativar elas.

int concurrencyLevel = 5;
            
var factory = new ConnectionFactory() 
{ 
    HostName = "localhost", 
    ConsumerDispatchConcurrency = concurrencyLevel
};

this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();
this.RabbitChannel.BasicQos(0, concurrencyLevel, false);

RabbitMQ - Assíncrono e paralelo

Chamadas assíncronas funcionam da mesma forma, a diferença é que a ConnectionFactory deve mudar a propriedade DispatchConsumersAsync, e o consumo das mensagens deve ser feita usando uma das classes fornecidas pela biblioteca que implementa IAsyncBasicConsumer.

var factory = new ConnectionFactory() 
{ 
    HostName = "localhost",
    DispatchConsumersAsync = true,
    ConsumerDispatchConcurrency = 5
};

this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();

var consumer = new AsyncEventingBasicConsumer(RabbitChannel);
consumer.Received += async (model, ea) =>
{
    // Process message with thread-safety
};

RabbitChannel.BasicConsume(queue: "queueName", autoAck: false, consumer: consumer);

Exemplo de BackgroundService usando async e paralelismo

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class ExampleWorker : BackgroundService
{
    private IConnection? RabbitConnection;
    private IModel? RabbitChannel;

    public override Task StartAsync(CancellationToken cancellationToken)
    {
        int concurrencyLevel = 5;

        var factory = new ConnectionFactory() 
        { 
            HostName = "localhost",
            DispatchConsumersAsync = true,
            ConsumerDispatchConcurrency = concurrencyLevel
        };

        this.RabbitConnection = factory.CreateConnection();
        this.RabbitChannel = RabbitConnection.CreateModel();
        this.RabbitChannel.BasicQos(0, concurrencyLevel, false);

        var consumer = new AsyncEventingBasicConsumer(RabbitChannel);
        consumer.Received += async (model, ea) =>
        {
            // Process message with thread-safety
        };

        RabbitChannel.BasicConsume(queue: "queueName", autoAck: false, consumer: consumer);

        return Task.CompletedTask;
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken) => Task.CompletedTask;

    public override Task StopAsync(CancellationToken cancellationToken)
    {
        RabbitChannel?.Dispose();
        RabbitConnection?.Dispose();

        return Task.CompletedTask;
    }
}