Vinicius Quinafelex Alves

🌐Ler em português

[C#] Consuming RabbitMQ queue with async and parallelism

Asynchronicity and paralellism are great tools in situations where a process spends more time waiting for I/O calls than doing stuff. Using async and parallel, the process is able to handle new requests as soon as the I/O call starts, decreasing how much time the process is idle and increasing the throughtput potential. When this process is a service, it's possible to handle the same amount of messages with less services instances.

RabbitMQ - Synchronous and parallel

RabbitMQ allows a consumer to receive multiple messages to be processed in parallel using the ConsumerDispatchConcurrency property. Due to the nature of parallel processing, make sure the message handler is thread-safe.

The code below configures the connection to allow dispatching 5 messages at the same time.

var factory = new ConnectionFactory() 
{ 
    HostName = "localhost", 
    ConsumerDispatchConcurrency = 5
};

this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();

var consumer = new EventingBasicConsumer(RabbitChannel);
consumer.Received += (model, ea) =>
{
    // Process message with thread-safety
};

RabbitChannel.BasicConsume(queue: "queueName", autoAck: false, consumer: consumer);

When the software is using BasicQos, it is recommended to set the amount of messages at least equal or bigger than the ConsumerDispatchConcurrency configured, otherwise the library will not be able to make use of all the threads configured.

int concurrencyLevel = 5;
            
var factory = new ConnectionFactory() 
{ 
    HostName = "localhost", 
    ConsumerDispatchConcurrency = concurrencyLevel
};

this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();
this.RabbitChannel.BasicQos(0, concurrencyLevel, false);

RabbitMQ - Asynchronous and parallel

Asynchronous calls work the same way, the difference is that the ConnectionFactory has to configure the DispatchConsumersAsync property, and the message consumption should use one of the different implementations of IAsyncBasicConsumer that the library provides.

var factory = new ConnectionFactory() 
{ 
    HostName = "localhost",
    DispatchConsumersAsync = true,
    ConsumerDispatchConcurrency = 5
};

this.RabbitConnection = factory.CreateConnection();
this.RabbitChannel = RabbitConnection.CreateModel();

var consumer = new AsyncEventingBasicConsumer(RabbitChannel);
consumer.Received += async (model, ea) =>
{
    // Process message with thread-safety
};

RabbitChannel.BasicConsume(queue: "queueName", autoAck: false, consumer: consumer);

Example of BackgroundService using async and parallelism

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class ExampleWorker : BackgroundService
{
    private IConnection? RabbitConnection;
    private IModel? RabbitChannel;

    public override Task StartAsync(CancellationToken cancellationToken)
    {
        int concurrencyLevel = 5;

        var factory = new ConnectionFactory() 
        { 
            HostName = "localhost",
            DispatchConsumersAsync = true,
            ConsumerDispatchConcurrency = concurrencyLevel
        };

        this.RabbitConnection = factory.CreateConnection();
        this.RabbitChannel = RabbitConnection.CreateModel();
        this.RabbitChannel.BasicQos(0, concurrencyLevel, false);

        var consumer = new AsyncEventingBasicConsumer(RabbitChannel);
        consumer.Received += async (model, ea) =>
        {
            // Process message with thread-safety
        };

        RabbitChannel.BasicConsume(queue: "queueName", autoAck: false, consumer: consumer);

        return Task.CompletedTask;
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken) => Task.CompletedTask;

    public override Task StopAsync(CancellationToken cancellationToken)
    {
        RabbitChannel?.Dispose();
        RabbitConnection?.Dispose();

        return Task.CompletedTask;
    }
}