学习rabbitMq中的消息确认和持久化机制

生产端消息确认#

tx机制#

tx机制叫做事务机制,RabbitMQ中有三个与tx机制的方法:txSelect()txCommit()txRollback()

  • channel.txSelect(): 用于将当前channel设置成transaction模式
  • channel.txCommit() :提交事务
  • channel.txRollback() :回滚事务

使用 tx 机制,首先要通过txSelect 方法开启事务,然后发布消息给 broker 服务器,如果 txCommit 提交成功,则说明消息成功被 broker 接收;如果在txCommit 执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候可以捕获异常,通过 txRollback 回滚事务。

 /// <summary>
 /// 生产端消息确认(tx事务机制)
 /// </summary>
 [TestMethod]
 public void PublisherTest_Transaction() 
 {
     channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

     string message = "hello world";
     byte[] messageBody = Encoding.UTF8.GetBytes(message);
     try
     {
         // 开启tx事务机制
         channel.TxSelect();

         // 消息发送
         channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: messageBody);

         // 事务提交
         channel.TxCommit();
     }
     catch (Exception ex)
     {
         // 事务回滚
         channel.TxRollback();
         Assert.Fail(ex.Message);
     }
 }

Confirm模式#

C#的RabbitMQ API中,有三个与Confirm相关的方法:ConfirmSelect()WaitForConfirms()WaitForConfirmOrDie

  • channel.ConfirmSelect() :表示开启Confirm模式
  • channel.WaitForConfirms() :等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false
  • channel.WaitForConfirmsOrDie()WaitForConfirms作用类型,也是等待所有消息确认。区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出OperationInterrupedException类型异常
/// <summary>
/// 生产端消息确认(Confirm模式)
/// </summary>
[TestMethod]
public void PublisherTest_Confirm()
{
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

    string message = "hello world";
    byte[] messageBody = Encoding.UTF8.GetBytes(message);

    // 开启Confirm模式
    channel.ConfirmSelect();

    // 消息发送
    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: messageBody);

    // WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功
    if (channel.WaitForConfirms())
    {
        Console.WriteLine($"Message发送成功");
    }
    else
    {
        Assert.Fail();
    }
}

消费端消息确认#

自动确认#

当RabbbitMQ将消息发送给消费者后,消费者接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。自动确认的用法十分简单,设置消费方法的参数 autoAck 为 true 即可

EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
    string message =
           Encoding.UTF8.GetString(ea.Body.ToArray());
};
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);

可能存在的问题:

  1. 丢失数据:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了
  2. 只要队列不空,RabbitMQ会源源不断的把消息推送给客户端,而不管客户端能否消费的完,如果其中一个消费端消费的较慢,会极大的浪费性能

手动确认(BasicAck)#

消费从队列中获取消息后,服务器会将该消息处于不可用状态,等待消费者反馈。Resume方法的参数autoAck设置为false,然后在消费端使用代码 channel.BasicAck()/BasicReject()等方法来确认和拒绝消息即可实现手动确认

EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
    string message =
           Encoding.wUTF8.GetString(ea.Body.ToArray());
	 // 手动ack
    channel.BasicAck(
        deliveryTag: ea.DeliveryTag,
        multiple: false);
};

channel.BasicConsume(queue: "hello",
    autoAck: false,
    consumer: consumer);

改为手动确认方式只需改两处

  1. 开启监听时将 autoAck 参数改为 false
  2. 消息消费成功后返回确认

这段代码中,先处理消息,成功后再做 ack响应,失败就不做 ack响应,这样消息会储存在MQUnacked消息里,不会丢失,看起来没啥问题,但是如果其中一条消息在处理时抛出了异常,将导致后续所有消息都会无法消费。

消息拒绝#

BasicNack()#

BasicReject()不同的是同时支持多个消息,可以nack 该消费者先前接收未ack 的所有消息

EventingBasicConsumer consumer =
    new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
    string message =
           Encoding.UTF8.GetString(ea.Body.ToArray());
    try
    {
        /* 消费到某条消息时出错
         * 导致Broker无法拿到正常回执信息引发后续消息都无法被正常消费
         * 如果MQ没得到ack响应,这些消息会堆积在Unacked消息里,不会丢弃,直至客户端断开重连时,才变回ready
         * 如果Consumer客户端不断开连接,这些Unacked消息,永远不会变回ready状态
         * Unacked消息多了,占用内存越来越大,就会异常
         */
        MessageConsumer(ea);
        channel.BasicAck(
                   deliveryTag: ea.DeliveryTag,
                   multiple: false);
    }
    catch (Exception ex)
    {
        // 出错了,发nack,并通知MQ把消息塞回的队列头部(不是尾部)
        channel.BasicNack(
            deliveryTag: ea.DeliveryTag,
            multiple: false,
            requeue: true);
    }
};

channel.BasicConsume(queue: "hello",
    autoAck: false,
    consumer: consumer);

这里将代码调整为消费正常就 ack,不正常就nack,并等下一次重新消费。看起来没问题,但是如果某条消息在消费时又抛出异常,该消息将会被Nack机制重新扔回 队列头部,下一步又消费这条会出异常的消息,又出错,塞回队列……进入死循环,所以要谨慎使用Nack机制。这里可以在catch中记录错误日志依旧使用ack确认消费。

BasicReject()#

消费端告诉服务器这个消息拒绝接收,不处理,可以设置是否放回到队列中还是丢掉(只能一次拒绝一个消息)

 MessagePublisher("hello", $"1");
 MessagePublisher("hello", $"2");
 MessagePublisher("hello", $"3");

 channel.QueueDeclare(
     queue: "hello",
     durable: false,
     exclusive: false,
     autoDelete: false,
     arguments: null);

 EventingBasicConsumer consumer =
     new EventingBasicConsumer(channel);

 channel.BasicQos(0, 1, false);

 consumer.Received += (model, ea) =>
 {
     string message =
            Encoding.UTF8.GetString(ea.Body.ToArray());

     if (message == "2")
     {
         Console.WriteLine($"Message:{message}");
         channel.BasicAck(
             deliveryTag: ea.DeliveryTag,
             multiple: false);
     }
     else
     {
         Console.WriteLine($"拒绝处理");
         /* BasicReject用于拒绝消息
            requeue参数指定了拒绝后是否重新放回queue
            一次只能拒绝一条消息
            设置为true: 消息会被重新仍回queue中
            设置为false:消息将被丢弃
         */
         channel.BasicReject(
             deliveryTag: ea.DeliveryTag,
             requeue: true);
     }
 };

 channel.BasicConsume(queue: "hello",
     autoAck: false,
     consumer: consumer);

BasicRecover()#

路由不成功的消息可以使用recovery重新发送到队列中,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己

消息持久化 Persistent#

参数重启RabbitMQ
exchange.durable=fasle/queue.durable=falseexchange/queue将会被丢弃
exchange.durable=fasleexchange将会被丢弃
queue.durable=faslequeue将会被丢弃
exchange.durable=fasle/queue.durable=trueexchange将会被丢弃,queue虽然会存在,但队列内消息会全部丢失
exchange.durable=true/queue.durable=trueexchange/queue会存在,但队列内消息会全部丢失
exchange.durable=true&&queue.durable=true/消息发布时(persistent=true)消息真正的持久化
for (int i = 0; i < 100; i++)
{
    byte[] messageBody = Encoding.UTF8.GetBytes(i.ToString());

    // 设置消息持久化
    var props = channel.CreateBasicProperties();
    props.Persistent = true;

    // 消息发送
    channel.BasicPublish(
        exchange: "TestExchange",
        routingKey: "",
        basicProperties: props,
        body: messageBody);
}

消息优先级 Priority#

queue是先进先出的,即先发送的消息先被消费。但是在具体业务中可能会遇到要提前处理某些消息的需求,如一个常见的需求:普通客户的消息按先进先出的顺序处理,vip客户的消息要提前处理。消息实现优先级控制的实现方式是:首先在声明queue是设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级等级即可

 /// <summary>
 /// 消息优先级
 /// </summary>
 [TestMethod]
 public void PublisherTest_Priority()
 {
     channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false,
                        arguments: new Dictionary<string, object>() {
                        // 队列优先级最高为10,不加x-max-priority的话,消息发布时设置了消息的优先级也不会生效
                        {"x-max-priority",10 }
                        });

     // 测试数据
     string[] msgs = { "vip1", "hello1", "hello2", "hello3", "vip5" };

     // 设置消息优先级
     IBasicProperties props = channel.CreateBasicProperties();
     foreach (string msg in msgs)
     {
         // vip开头的消息,优先级设置为9,其他消息优先级为1
         if (msg.StartsWith("vip"))
             props.Priority = 9;
         else
             props.Priority = 1;

         channel.BasicPublish(exchange: "",
                                routingKey: queueName,
                                basicProperties: props,
                                body: Encoding.UTF8.GetBytes(msg));
     }
     Assert.IsTrue(true);
 }